{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Parallelization\n",
    "\n",
    "This tutorial covers advanced setups for parallelizing TPOT with Dask. If you just want to parallelize TPOT within a single computer with multiple processes, set the n_jobs parameter to the number of threads you want to use and skip this tutorial. \n",
    "\n",
    "TPOT uses Dask for parallelization and defaults to using a dask.distributed.LocalCluster for local parallelization. A user can pass in a custom Dask client or cluster for advanced usage. For example, a multi-node parallelization is possible using the dask-jobqueue package.\n",
    "\n",
    "\n",
    "TPOT can be easily parallelized on a local computer by setting the n_jobs and memory_limit parameters.\n",
    "\n",
    "`n_jobs` dictates how many dask workers to launch. In TPOT this corresponds to the number of pipelines to evaluate in parallel.\n",
    "\n",
    "`memory_limit` is the amount of RAM to use per worker. "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### TPOT with Python Scripts\n",
    "\n",
    "When running tpot from an .py script, it is important to protect code with `if __name__==\"__main__\":`\n",
    "\n",
    "This is due to how parallelization is handled in Python. In short, when Python spawns new processes, each new process reimports code from the relevant .py files, including rerunning code. The context under `if __name__==\"__main__\":` ensures the code under it only executed by the main process and only once. More info [here](https://docs.dask.org/en/stable/scheduling.html#standalone-python-scripts)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
      "  from .autonotebook import tqdm as notebook_tqdm\n",
      "Generation: : 8it [01:00,  7.57s/it]\n",
      "/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:595: UserWarning: n_components is too large: it will be set to 4\n",
      "  warnings.warn(\n",
      "/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge\n",
      "  warnings.warn(\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.997905982905983\n"
     ]
    }
   ],
   "source": [
    "import tpot\n",
    "import sklearn\n",
    "import sklearn.datasets\n",
    "import numpy as np\n",
    "scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n",
    "X, y = sklearn.datasets.load_iris(return_X_y=True)\n",
    "X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n",
    "\n",
    "\n",
    "graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(\n",
    "    root_search_space= tpot.config.get_search_space([\"KNeighborsClassifier\", \"LogisticRegression\", \"DecisionTreeClassifier\"]),\n",
    "    leaf_search_space = tpot.config.get_search_space(\"selectors\"), \n",
    "    inner_search_space = tpot.config.get_search_space([\"transformers\"]),\n",
    "    max_size = 10,\n",
    "    )\n",
    "\n",
    "est = tpot.TPOTEstimator(\n",
    "    scorers = [\"roc_auc_ovr\"],\n",
    "    scorers_weights = [1],\n",
    "    classification = True,\n",
    "    cv = 10,\n",
    "    search_space = graph_search_space,\n",
    "    max_time_mins = 60,\n",
    "    verbose = 2,\n",
    "    n_jobs=16,\n",
    "    memory_limit=\"4GB\"\n",
    ")\n",
    "\n",
    "est.fit(X_train, y_train)\n",
    "print(scorer(est, X_test, y_test))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Manual Dask Clients and Dashboard\n",
    "\n",
    "You can also manually initialize a dask client. This can be useful to gain additional control over the parallelization, debugging, as well as viewing a dashboard of the live performance of TPOT.\n",
    "\n",
    "You can find more details in the official [documentation here.](https://docs.dask.org/en/stable/)\n",
    "\n",
    "\n",
    "[Dask Python Tutorial](https://docs.dask.org/en/stable/deploying-python.html)\n",
    "[Dask Dashboard](https://docs.dask.org/en/stable/dashboard.html)\n",
    "\n",
    "\n",
    "Note that the if a client is passed in manually, TPOT will ignore n_jobs and memory_limit.\n",
    "If there is no client passed in, TPOT will ignore any global/existing client and create its own."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Initializing a basic dask local cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, LocalCluster\n",
    "\n",
    "n_jobs = 4\n",
    "memory_limit = \"4GB\"\n",
    "\n",
    "cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own\n",
    "                        threads_per_worker=1,\n",
    "                        memory_limit=memory_limit)\n",
    "client = Client(cluster)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the link to view the dask Dashboard. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'http://127.0.0.1:8787/status'"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "client.dashboard_link"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Generation: : 8it [01:01,  7.69s/it]\n",
      "/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge\n",
      "  warnings.warn(\n",
      "2025-02-21 16:37:55,843 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name=\"execute('eval_objective_list-8bdcf8a1c1f54374fc47664011238a6d')\" coro=<Worker.execute() done, defined at /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.997905982905983\n"
     ]
    }
   ],
   "source": [
    "graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(\n",
    "    root_search_space= tpot.config.get_search_space([\"KNeighborsClassifier\", \"LogisticRegression\", \"DecisionTreeClassifier\"]),\n",
    "    leaf_search_space = tpot.config.get_search_space(\"selectors\"), \n",
    "    inner_search_space = tpot.config.get_search_space([\"transformers\"]),\n",
    "    max_size = 10,\n",
    "    )\n",
    "\n",
    "est = tpot.TPOTEstimator(\n",
    "    client = client,\n",
    "    scorers = [\"roc_auc_ovr\"],\n",
    "    scorers_weights = [1],\n",
    "    classification = True,\n",
    "    cv = 10,\n",
    "    search_space = graph_search_space,\n",
    "    max_time_mins = 60,\n",
    "    early_stop=10,\n",
    "    verbose = 2,\n",
    ")\n",
    "\n",
    "\n",
    "# this is equivalent to: \n",
    "# est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n",
    "est.fit(X_train, y_train)\n",
    "print(scorer(est, X_test, y_test))\n",
    "\n",
    "#It is good to close the client and cluster when you are done with them\n",
    "client.close()\n",
    "cluster.close()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Option 2\n",
    "\n",
    "You can initialize the cluster and client with a context manager that will automatically close them. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Generation: : 10it [01:00,  6.07s/it]\n",
      "/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge\n",
      "  warnings.warn(\n",
      "2025-02-21 16:38:57,976 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name=\"execute('eval_objective_list-87c6eded7038f6c8291a3ee9879aef3f')\" coro=<Worker.execute() done, defined at /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1.0\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-02-21 16:39:01,975 - distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing\n"
     ]
    }
   ],
   "source": [
    "from dask.distributed import Client, LocalCluster\n",
    "import tpot\n",
    "import sklearn\n",
    "import sklearn.datasets\n",
    "import numpy as np\n",
    "\n",
    "scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n",
    "X, y = sklearn.datasets.load_iris(return_X_y=True)\n",
    "X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n",
    "\n",
    "\n",
    "n_jobs = 4\n",
    "memory_limit = \"4GB\"\n",
    "\n",
    "with LocalCluster(  \n",
    "    n_workers=n_jobs,\n",
    "    threads_per_worker=1,\n",
    "    memory_limit='4GB',\n",
    ") as cluster, Client(cluster) as client:\n",
    "    graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(\n",
    "        root_search_space= tpot.config.get_search_space([\"KNeighborsClassifier\", \"LogisticRegression\", \"DecisionTreeClassifier\"]),\n",
    "        leaf_search_space = tpot.config.get_search_space(\"selectors\"), \n",
    "        inner_search_space = tpot.config.get_search_space([\"transformers\"]),\n",
    "        max_size = 10,\n",
    "        )\n",
    "\n",
    "    est = tpot.TPOTEstimator(\n",
    "        client = client,\n",
    "        scorers = [\"roc_auc_ovr\"],\n",
    "        scorers_weights = [1],\n",
    "        classification = True,\n",
    "        cv = 5,\n",
    "        search_space = graph_search_space,\n",
    "        max_time_mins = 60,\n",
    "        early_stop=10,\n",
    "        verbose = 2,\n",
    "        )\n",
    "    est.fit(X_train, y_train)\n",
    "    print(scorer(est, X_test, y_test))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dask multi node parallelization on HPC\n",
    "\n",
    "Dask can parallelize across multiple nodes via job queueing systems. This is done using the Dask-Jobqueue package. More information can be found in the official [documentation here.]( https://jobqueue.dask.org/en/latest/)\n",
    "\n",
    "To parallelize TPOT with Dask-Jobqueue, simply pass in a client based on a Jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.\n",
    "\n",
    "Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the Dask cluster. \n",
    "\n",
    "\n",
    "The following example is specific to the Sun Grid Engine. Other supported clusters can be found in the [Dask-Jobqueue documentation here](https://jobqueue.dask.org/en/latest/examples.html)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.\n"
     ]
    }
   ],
   "source": [
    "from dask.distributed import Client, LocalCluster\n",
    "import sklearn\n",
    "import sklearn.datasets\n",
    "import sklearn.metrics\n",
    "import sklearn.model_selection\n",
    "import tpot\n",
    "from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.\n",
    "import os\n",
    "\n",
    "if os.system(\"which qsub\") != 0:\n",
    "    print(\"Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.\")\n",
    "else:\n",
    "    print(\"Sun Grid Engine is installed.\")\n",
    "\n",
    "    \n",
    "    cluster = SGECluster(\n",
    "        queue='all.q',\n",
    "        cores=2,\n",
    "        memory=\"50 GB\"\n",
    "\n",
    "    )\n",
    "\n",
    "    cluster.adapt(minimum_jobs=10, maximum_jobs=100)  # auto-scale between 10 and 100 jobs\n",
    "\n",
    "    client = Client(cluster)\n",
    "\n",
    "    scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n",
    "    X, y = sklearn.datasets.load_digits(return_X_y=True)\n",
    "    X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n",
    "\n",
    "    graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(\n",
    "    root_search_space= tpot.config.get_search_space([\"KNeighborsClassifier\", \"LogisticRegression\", \"DecisionTreeClassifier\"]),\n",
    "    leaf_search_space = tpot.config.get_search_space(\"selectors\"), \n",
    "    inner_search_space = tpot.config.get_search_space([\"transformers\"]),\n",
    "    max_size = 10,\n",
    "    )\n",
    "\n",
    "    est = tpot.TPOTEstimator(\n",
    "        client = client,\n",
    "        scorers = [\"roc_auc\"],\n",
    "        scorers_weights = [1],\n",
    "        classification = True,\n",
    "        cv = 10,\n",
    "        search_space = graph_search_space,\n",
    "        max_time_mins = 60,\n",
    "        early_stop=10,\n",
    "        verbose = 2,\n",
    "        )\n",
    "    est.fit(X_train, y_train)\n",
    "    # this is equivalent to: \n",
    "    # est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n",
    "    est.fit(X_train, y_train)\n",
    "    print(scorer(est, X_test, y_test))\n",
    "\n",
    "    #It is good to close the client and cluster when you are done with them\n",
    "    client.close()\n",
    "    cluster.close()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "tpotenv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.16"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
